iT邦幫忙

2024 iThome 鐵人賽

DAY 9
0
AI/ ML & Data

自動交易程式探索系列 第 9

Day 9 - 訓練DRL做動態的資產配置 (2/2)

  • 分享至 

  • xImage
  •  

參考資料
FinRL_PortfolioAllocation_NeurIPS_2020.ipynb
FinRL_PortfolioAllocation_NeurIPS_2020.py

回測

改變 MVO 策略的計算方法

為了更準確地比較 MVO 策略和 DRL 模型的表現,教學原文這邊修改了 MVO 的計算方法,使其與 DRL 模型的動態特性相匹配。

  1. max_sharpe()改為min_volatility(),以最小化波動率。
  2. 將 MVO 策略改為每日再平衡,以便與 DRL 模型的動態調整相對應。

    前篇文章中的 MVO 策略是一次性計算並持有至到期,但因為與 portfolio allocation 需動態調整資產配置的目的不符,因此修改。

  3. MVO 此處的單一資產最大占比為 0.1,以避免過度集中投資。

    前篇文章中的 MVO 策略單一資產的最大占比為 0.5,可能導致過度集中投資,總之就是可能的獲利較大但風險也較大。

以下是修改後的 MVO 計算函數:

def calculate_daily_rebalanced_mvo_portfolio(df, unique_trade_dates, initial_capital=1000000):
    """
    通過每日再平衡計算最小方差組合(MVO)。

    參數:
    - df:包含股票數據的 DataFrame,需要包含 'date'、'tic'、'close' 和 'return_list' 列。
    - unique_trade_dates:交易日期的列表或數組。
    - initial_capital:初始資金,默認為 1,000,000。

    返回:
    - portfolio:包含賬戶價值隨時間變化的 DataFrame。
    """
    from pypfopt.efficient_frontier import EfficientFrontier
    import numpy as np

    # 初始化投資組合 DataFrame
    portfolio = pd.DataFrame(index=range(1), columns=unique_trade_dates)
    portfolio.loc[0, unique_trade_dates[0]] = initial_capital

    # 遍歷交易日期,計算每日投資組合價值
    for i in range(len(unique_trade_dates) - 1):
        current_date = unique_trade_dates[i]
        next_date = unique_trade_dates[i + 1]

        # 獲取當前日期和下一日期的數據
        df_current = df[df.date == current_date].reset_index(drop=True)
        df_next = df[df.date == next_date].reset_index(drop=True)

        # 計算當前日期的協方差矩陣
        Sigma = df_current['return_list'][0].cov()

        # 使用最小化波動率的方法計算權重
        ef = EfficientFrontier(None, Sigma, weight_bounds=(0, 0.1))
        ef.min_volatility()
        cleaned_weights = ef.clean_weights()

        # 計算每個資產的資金分配和持有數量
        cap = portfolio.iloc[0, i]
        weights = np.array(list(cleaned_weights.values()))
        current_cash_allocation = weights * cap
        current_prices = df_current['close'].values
        current_shares = current_cash_allocation / current_prices

        # 計算下一日期的投資組合價值
        next_prices = df_next['close'].values
        portfolio_value_next = np.dot(current_shares, next_prices)

        # 存儲投資組合價值
        portfolio.iloc[0, i + 1] = portfolio_value_next

    # 轉置 DataFrame 並命名列
    portfolio = portfolio.T
    portfolio.columns = ['account_value']

    return portfolio

通過上述修改,MVO 策略每天重新計算最優權重並再平衡,與 DRL 模型的動態調整相對應。

計算基準績效(Baseline)

在對 DRL 模型進行回測之前,我們需要計算基準策略的績效,以便進行比較。這裡我們使用道瓊斯工業平均指數(DJI)和最小方差組合(MVO)作為基準。

### 獲取基準數據
dji_daily_return = calculate_dji_daily_return(TRADE_START_DATE, END_DATE)
unique_trade_date = trade['date'].unique()
mvo_portfolio = calculate_daily_rebalanced_mvo_portfolio(df, unique_trade_date)
mvo_daily_return = calculate_daily_return_from_portfolio(mvo_portfolio)

驗證績效

# 初始化交易環境
e_trade_gym = StockPortfolioEnv(df=trade, **env_kwargs)

# 進行回測
backtest_results = backtest_drl(e_trade_gym, trained_models)

# 繪製累計收益曲線
trade['date'] = pd.to_datetime(trade['date'])
time_ind = pd.Series(trade['date'].unique()).sort_values()
plot_cumulative_returns(backtest_results, time_ind, min_var_daily_return=mvo_daily_return, dji_daily_return=dji_daily_return)

累計收益曲線

結果分析

在動態修改資產配置的情況下,DRL 與 MVO 策略和 DJI 的累計收益曲線比較如下:

修改後的累計收益曲線

測試訓練數據和驗證數據不重疊的影響

因為教學原文刻意讓訓練數據區間與回測數據區間重疊,所以這邊特別實驗了一下完全不重疊的情況,評估一下模型的泛化能力如何。
我將訓練數據區間改為 '2009-01-01''2019-12-31',交易(回測)數據區間為 '2020-01-01''2021-10-31'
重新訓練模型並進行回測後,發現模型的績效有所下降,但其實也沒有下降非常多。
看起來 DRL 的模型還是有一定的泛化能力,但是在實際應用中,我最擔心的還是模型的泛化能力,畢竟資產數據量怎麼樣都不可能像圍棋那樣要多少有多少。

非重疊數據的累計收益曲線

完整的代碼

from stable_baselines3 import A2C, DDPG, PPO, SAC, TD3
from finrl.agents.stablebaselines3.models import DRLAgent
import plotly.graph_objs as go
import pandas as pd
import os

import quantstats as qs


def train_drl(e_trade_gym, models_info):
    """
    Function to train deep reinforcement learning (DRL) models.
    
    Parameters:
    - e_trade_gym: The trading environment for backtesting.
    - models_info: A dictionary containing the model class as keys and corresponding training parameters and paths as values.
    
    Returns:
    - trained_models: A dictionary containing the trained models.
    """
    env_train, _ = e_trade_gym.get_sb_env()
    # Initialize DRLAgent
    agent = DRLAgent(env=env_train)

    # Dictionary to store the trained models
    trained_models = {}

    # Loop through each model class and its associated information
    for model_class, info in models_info.items():
        model_path = info["save_path"]

        if os.path.exists(model_path):
            print(f"正在從 {model_path} 加載現有的 {model_class.__name__} 模型")

            # Load the model using stable-baselines3
            try:
                model = model_class.load(model_path, env=env_train)
                trained_models[model_class.__name__] = model
                print(f"{model_class.__name__} 模型加載成功。")
            except Exception as e:
                print(f"加載 {model_class.__name__} 模型失敗: {e}")
                print(f"將繼續訓練 {model_class.__name__} 模型。")

                # Train the model if loading fails
                model = agent.get_model(
                    model_name=model_class.__name__.lower(),
                    model_kwargs=info["params"])
                trained_model = agent.train_model(
                    model=model,
                    tb_log_name=model_class.__name__.lower(),
                    total_timesteps=info["total_timesteps"])
                trained_model.save(model_path)
                trained_models[model_class.__name__] = trained_model
                print(f"{model_class.__name__} 模型已訓練並保存到 {model_path}")
        else:
            print(f"正在訓練 {model_class.__name__} 模型...")
            model = agent.get_model(model_name=model_class.__name__.lower(),
                                    model_kwargs=info["params"])
            trained_model = agent.train_model(
                model=model,
                tb_log_name=model_class.__name__.lower(),
                total_timesteps=info["total_timesteps"])
            trained_model.save(model_path)
            trained_models[model_class.__name__] = trained_model
            print(f"{model_class.__name__} 模型已訓練並保存到 {model_path}")

    return trained_models


def backtest_drl(e_trade_gym, trained_models):
    """
    Function to backtest all trained DRL models.
    
    Parameters:
    - e_trade_gym: The trading environment for backtesting.
    - trained_models: Dictionary of trained models.
    
    Returns:
    - backtest_results: Dictionary containing daily returns and actions for each model.
    """
    # 初始化 backtest 結果的字典
    backtest_results = {}

    # 對每個模型進行回測
    for model_name, model in trained_models.items():
        print(f"正在對 {model_name} 模型進行回測...")

        # 使用該模型進行預測
        df_daily_return, df_actions = DRLAgent.DRL_prediction(
            model=model, environment=e_trade_gym)

        # 儲存回測結果
        backtest_results[model_name] = {
            'daily_return': df_daily_return,
            'actions': df_actions
        }

        # 輸出回測的前幾行結果作為確認
        print(f"{model_name} 模型的每日回報:")
        print(df_daily_return.head())
        print(f"{model_name} 模型的動作:")
        print(df_actions.head())

    return backtest_results


def plot_html(trace_list, time_ind):
    fig = go.Figure()
    for trace in trace_list:
        fig.add_trace(trace)

    fig.update_layout(legend=dict(x=0,
                                  y=1,
                                  traceorder="normal",
                                  font=dict(family="sans-serif",
                                            size=15,
                                            color="black"),
                                  bgcolor="White",
                                  bordercolor="white",
                                  borderwidth=2), )
    #fig.update_layout(legend_orientation="h")
    fig.update_layout(
        title={
            #'text': "Cumulative Return using FinRL",
            'y': 0.85,
            'x': 0.5,
            'xanchor': 'center',
            'yanchor': 'top'
        })
    #with Transaction cost
    #fig.update_layout(title =  'Quarterly Trade Date')
    fig.update_layout(
        #    margin=dict(l=20, r=20, t=20, b=20),
        paper_bgcolor='rgba(1,1,0,0)',
        plot_bgcolor='rgba(1, 1, 0, 0)',
        #xaxis_title="Date",
        yaxis_title="Cumulative Return",
        xaxis={
            'type': 'date',
            'tick0': time_ind[0],
            'tickmode': 'linear',
            'dtick': 86400000.0 * 80
        })
    fig.update_xaxes(showline=True,
                     linecolor='black',
                     showgrid=True,
                     gridwidth=1,
                     gridcolor='LightSteelBlue',
                     mirror=True)
    fig.update_yaxes(showline=True,
                     linecolor='black',
                     showgrid=True,
                     gridwidth=1,
                     gridcolor='LightSteelBlue',
                     mirror=True)
    fig.update_yaxes(zeroline=True,
                     zerolinewidth=1,
                     zerolinecolor='LightSteelBlue')

    fig.show()


def plot_cumulative_returns(backtest_results,
                            time_ind,
                            min_var_daily_return=None,
                            dji_daily_return=None):
    """
    Function to plot cumulative returns for DRL models, Min-Variance, and DJI.
    
    Parameters:
    - backtest_results: Dictionary containing daily returns for different DRL models.
    - time_ind: Pandas Series of dates (x-axis values).
    - min_var_daily_return: (Optional) Daily returns for the Min-Variance model. Default is None.
    - dji_daily_return: (Optional) Daily returns for the DJI index. Default is None.
    """

    # Initialize the figure
    trace_list = []

    # Loop through backtest results and add traces for each model's cumulative returns
    for model_name, result in backtest_results.items():
        df_daily_return = result['daily_return']
        # 計算累積回報 (Cumulative Returns)
        cumpod = (df_daily_return['daily_return'] + 1).cumprod() - 1

        # Add trace for each model's cumulative return
        trace = go.Scatter(x=time_ind,
                           y=cumpod,
                           mode='lines',
                           name=f'{model_name} (Portfolio Allocation)')
        trace_list.append(trace)

    # Conditionally add Min-Variance cumulative return trace if provided
    if min_var_daily_return is not None:
        # 計算MVO累積回報
        min_var_cumpod = (min_var_daily_return + 1).cumprod() - 1
        trace_min_var = go.Scatter(x=time_ind,
                                   y=min_var_cumpod,
                                   mode='lines',
                                   name='Min-Variance')
        trace_list.append(trace_min_var)

    # Conditionally add DJI cumulative return trace if provided
    if dji_daily_return is not None:
        # 計算DJI累積回報
        dji_cumpod = (dji_daily_return + 1).cumprod() - 1
        trace_dji = go.Scatter(x=time_ind,
                               y=dji_cumpod,
                               mode='lines',
                               name='DJIA')
        trace_list.append(trace_dji)

    plot_html(trace_list, time_ind)


def generate_quantstats_report(daily_return_df, output_filename, title):
    """
    Function to generate a QuantStats HTML report.

    Parameters:
    - daily_return_df: Pandas DataFrame containing 'date' and 'daily_return' columns.
    - output_filename: The name of the output HTML file for the report.
    - title: The title for the report.

    Returns:
    - None. The report will be saved as an HTML file.
    """

    # 確保 'date' 列是 datetime 格式,並將其設置為索引
    daily_return_df['date'] = pd.to_datetime(daily_return_df['date'])
    daily_return_df.set_index('date', inplace=True)

    # 確保 'daily_return' 是 pandas Series
    daily_return_series = daily_return_df['daily_return']

    # 使用 quantstats 生成 HTML 報告,將其保存為文件
    qs.reports.html(daily_return_series, output=output_filename, title=title)
    qs.reports.plots(daily_return_series)

    print(f"Quantstats HTML 報告已生成並保存為 '{output_filename}'。")


def calculate_dji_daily_return(start_date, end_date):
    """
    Calculate daily returns for the DJI index.

    Parameters:
    - start_date: The start date for fetching DJI data.
    - end_date: The end date for fetching DJI data.

    Returns:
    - baseline_returns: Pandas Series containing daily returns for the DJI index.
    """
    from finrl.plot import get_baseline, get_daily_return
    # Fetch baseline DJI data
    baseline_df = get_baseline(ticker='^DJI', start=start_date, end=end_date)

    # Calculate daily returns for DJI
    baseline_returns = get_daily_return(baseline_df, value_col_name="close")

    return baseline_returns


def calculate_daily_rebalanced_mvo_portfolio(df,
                                             unique_trade_dates,
                                             initial_capital=1000000):
    """
    通过每日再平衡计算最小方差组合(MVO)。

    参数:
    - df:包含股票数据的 DataFrame,需要包含 'date'、'tic'、'close' 和 'return_list' 列。
    - unique_trade_dates:交易日期的列表或数组。
    - initial_capital:初始资金,默认为 1,000,000。

    返回:
    - portfolio:包含账户价值随时间变化的 DataFrame。
    """
    from pypfopt.efficient_frontier import EfficientFrontier
    import numpy as np

    # 初始化投资组合 DataFrame
    portfolio = pd.DataFrame(index=range(1), columns=unique_trade_dates)
    portfolio.loc[0, unique_trade_dates[0]] = initial_capital

    # 遍历交易日期,计算每日投资组合价值
    for i in range(len(unique_trade_dates) - 1):
        current_date = unique_trade_dates[i]
        next_date = unique_trade_dates[i + 1]

        # 获取当前日期和下一日期的数据
        df_current = df[df.date == current_date].reset_index(drop=True)
        df_next = df[df.date == next_date].reset_index(drop=True)

        # 计算当前日期的协方差矩阵
        Sigma = df_current['return_list'][0].cov()

        # 使用最小化波动率的方法计算权重
        ef = EfficientFrontier(None, Sigma, weight_bounds=(0, 0.1))
        ef.min_volatility()
        cleaned_weights = ef.clean_weights()

        # 计算每个资产的资金分配和持有数量
        cap = portfolio.iloc[0, i]
        weights = np.array(list(cleaned_weights.values()))
        current_cash_allocation = weights * cap
        current_prices = df_current['close'].values
        current_shares = current_cash_allocation / current_prices

        # 计算下一日期的投资组合价值
        next_prices = df_next['close'].values
        portfolio_value_next = np.dot(current_shares, next_prices)

        # 存储投资组合价值
        portfolio.iloc[0, i + 1] = portfolio_value_next

    # 转置 DataFrame 并命名列
    portfolio = portfolio.T
    portfolio.columns = ['account_value']

    return portfolio


def calculate_daily_return_from_portfolio(portfolio):
    """
    Calculate daily returns from a portfolio's account value.

    Parameters:
    - portfolio: DataFrame containing the portfolio's account value over time.

    Returns:
    - daily_return: Pandas Series containing daily returns.
    """
    # 確保 'account_value' 列存在於 portfolio 中
    if 'account_value' not in portfolio.columns:
        raise ValueError("Portfolio must contain 'account_value' column")

    # 計算每日回報
    daily_return = portfolio['account_value'].pct_change().dropna()

    return daily_return


def calculate_cov_list(df, lookback=252):
    """
    Calculate cov_list and return_list from price data.

    Parameters:
    - df: The dataframe containing the price data.
    - lookback: The lookback period for calculating the covariance (default is 252, which represents a year).

    Returns:
    - df: DataFrame with the calculated cov_list and return_list.
    """
    df = df.sort_values(['date', 'tic'], ignore_index=True)
    df.index = df.date.factorize()[0]

    cov_list = []
    return_list = []

    for i in range(lookback, len(df.index.unique())):
        data_lookback = df.loc[i - lookback:i, :]
        price_lookback = data_lookback.pivot_table(index='date',
                                                   columns='tic',
                                                   values='close')
        return_lookback = price_lookback.pct_change().dropna()
        return_list.append(return_lookback)

        # 計算協方差矩陣
        covs = return_lookback.cov().values
        cov_list.append(covs)

    # 將協方差矩陣加入 df
    df_cov = pd.DataFrame({
        'date': df.date.unique()[lookback:],
        'cov_list': cov_list,
        'return_list': return_list
    })
    df = df.merge(df_cov, on='date')
    df = df.sort_values(['date', 'tic']).reset_index(drop=True)

    return df


def main():
    from finrl import config
    from finrl import config_tickers
    from finrl.meta.data_processors.processor_yahoofinance import YahooFinanceProcessor
    from finrl.meta.env_portfolio_allocation.env_portfolio import StockPortfolioEnv
    from finrl.meta.preprocessor.preprocessors import FeatureEngineer, data_split
    # 設定環境
    TIME_INTERVAL = '1D'
    START_DATE = '2008-01-01'
    END_DATE = '2021-10-31'
    RAW_DATA_PATH = f'./datasets/DOW30_{TIME_INTERVAL}_{START_DATE}_to_{END_DATE}.csv'

    TRAIN_START_DATE = '2009-01-01'
    TRAIN_END_DATE = '2019-12-31'
    TRADE_START_DATE = '2020-01-01'

    ## Part 3. Download Data
    print(config_tickers.DOW_30_TICKER)
    if not os.path.exists(RAW_DATA_PATH):
        dp = YahooFinanceProcessor()
        df = dp.download_data(start_date=START_DATE,
                              end_date=END_DATE,
                              ticker_list=config_tickers.DOW_30_TICKER,
                              time_interval=TIME_INTERVAL)
        df.to_csv(RAW_DATA_PATH)
    else:
        df = pd.read_csv(RAW_DATA_PATH)
    if 'date' not in df.columns and 'timestamp' in df.columns:
        df = df.rename(columns={'timestamp': 'date'})

    ### 特徵工程
    """
        後面初始化StockPortfolioEnv時 "tech_indicator_list": config.INDICATORS
        會需要指定使用的技術指標,
        所以這裡先對數據進行特徵工程, 取得技術指標
    """
    fe = FeatureEngineer(use_technical_indicator=True,
                         use_turbulence=False,
                         user_defined_feature=False)

    df = fe.preprocess_data(df)

    # 計算協方差矩陣
    df = calculate_cov_list(df)

    # 獲取訓練數據 (假設你已經準備好數據)
    # 注意,你需要根據實際情況調用數據處理過程,例如 data_split 或其他
    train, trade = data_split(df, TRAIN_START_DATE,
                              TRAIN_END_DATE), data_split(
                                  df, TRADE_START_DATE, END_DATE)

    # 定義環境參數
    stock_dimension = len(train.tic.unique())
    state_space = stock_dimension
    print(f"Stock Dimension: {stock_dimension}, State Space: {state_space}")

    env_kwargs = {
        "hmax": 100,
        "initial_amount": 1000000,
        "transaction_cost_pct": 0.001,
        "state_space": state_space,
        "stock_dim": stock_dimension,
        "tech_indicator_list": config.INDICATORS,
        "action_space": stock_dimension,
        "reward_scaling": 1e-4
    }

    # 初始化環境
    e_train_gym = StockPortfolioEnv(df=train, **env_kwargs)

    # 定義模型類別映射和參數
    models_info = {
        A2C: {
            "params": {
                "n_steps": 5,
                "ent_coef": 0.005,
                "learning_rate": 0.0002
            },
            "total_timesteps": 50000,
            "save_path": './training/portfolio_allocation/trained_a2c.zip'
        },
        PPO: {
            "params": {
                "n_steps": 2048,
                "ent_coef": 0.005,
                "learning_rate": 0.0001,
                "batch_size": 128,
            },
            "total_timesteps": 80000,
            "save_path": './training/portfolio_allocation/trained_ppo.zip'
        },
        DDPG: {
            "params": {
                "batch_size": 128,
                "buffer_size": 50000,
                "learning_rate": 0.001
            },
            "total_timesteps": 50000,
            "save_path": './training/portfolio_allocation/trained_ddpg.zip'
        },
        SAC: {
            "params": {
                "batch_size": 128,
                "buffer_size": 100000,
                "learning_rate": 0.0003,
                "learning_starts": 100,
                "ent_coef": "auto_0.1",
            },
            "total_timesteps": 50000,
            "save_path": './training/portfolio_allocation/trained_sac.zip'
        },
        TD3: {
            "params": {
                "batch_size": 100,
                "buffer_size": 1000000,
                "learning_rate": 0.001
            },
            "total_timesteps": 30000,
            "save_path": './training/portfolio_allocation/trained_td3.zip'
        }
    }

    ### get baseline
    dji_daily_return = calculate_dji_daily_return(TRADE_START_DATE, END_DATE)
    unique_trade_date = trade['date'].unique()
    mvo_portfolio = calculate_daily_rebalanced_mvo_portfolio(
        df, unique_trade_date)
    mvo_daily_return = calculate_daily_return_from_portfolio(mvo_portfolio)

    # 訓練模型
    trained_models = train_drl(e_train_gym, models_info)
    # 初始化交易環境
    e_trade_gym = StockPortfolioEnv(df=trade, **env_kwargs)

    # 進行回測
    backtest_results = backtest_drl(e_trade_gym, trained_models)

    # 假設 backtest_results 來自 backtest_drl 函數
    trade['date'] = pd.to_datetime(trade['date'])
    time_ind = pd.Series(trade['date'].unique()).sort_values()
    plot_cumulative_returns(backtest_results,
                            time_ind,
                            min_var_daily_return=mvo_daily_return,
                            dji_daily_return=dji_daily_return)

    print(backtest_results)

    # 假設你想用 PPO 模型的回報生成 quantstats 報告
    ppo_daily_return = backtest_results['PPO']['daily_return']
    generate_quantstats_report(ppo_daily_return, "PPO_performance_report.html",
                               "PPO Performance Report")


if __name__ == "__main__":
    main()

上一篇
Day 8 - 訓練DRL做動態的資產配置 (1/2)
下一篇
Day 10 - 繪製圖表
系列文
自動交易程式探索12
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言